package com.ysy.leader;

import io.netty.util.internal.StringUtil;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

/**
 * @program: zookeeper-demo
 * @description: leader选举demo
 * @author: yeshiyuan
 * @create: 2019-06-17 18:55
 **/
public class LeaderChooseDemo implements Watcher{

    private static String host = "192.168.5.35:2181";

    private static String leaderNamePre = "leader_";

    private static String root = "/root";
    private String prePath = root + "/" + leaderNamePre;

    private static int sessionTimeout = 30000;

    private ZooKeeper zooKeeper;

    private boolean existFlag = true;

    private CountDownLatch stopLatch = new CountDownLatch(1);

    private String myNode;

    public LeaderChooseDemo() {
        try {
            zooKeeper = new ZooKeeper(host, sessionTimeout, this);
            Stat stat = zooKeeper.exists(root, false);
            if (stat == null) {
                zooKeeper.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    /**
      * @despriction：寻找根节点
      * @author  yeshiyuan
      * @created 2019/6/17 19:15
      * @params []
      * @return void
      */
    private void toBeLeader() {
        try {
            if (StringUtil.isNullOrEmpty(myNode)) {
                myNode = zooKeeper.create(prePath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            }
            List<String> childNodes = zooKeeper.getChildren(root, new Watcher() {
                public void process(WatchedEvent watchedEvent) {
                    if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                        if (Event.EventType.NodeChildrenChanged == watchedEvent.getType()) {
                            if (existFlag) {
                                System.out.println("监听到主节点修改，需要竞选出新的leader");
                                toBeLeader();
                            }
                        }
                    }
                }
            });
            if (childNodes != null || !childNodes.isEmpty()) {
                if (myNode.equals(root + "/" + childNodes.get(0))) {
                    leader();
                } else {
                    following();
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void toBeChildNode() throws InterruptedException, KeeperException {
        TimeUnit.SECONDS.sleep(1);
        System.out.println(myNode + "节点消失，需要竞选新的leader");
        existFlag = false;
        zooKeeper.close();
        stopLatch.countDown();
        //zooKeeper.delete(myNode, -1);


    }

    public void process(WatchedEvent watchedEvent) {
        if (watchedEvent.getState() == Event.KeeperState.Closed) {
            //System.out.println(myNode + "节点关闭连接");
        }
    }

    private void await() throws InterruptedException {
        stopLatch.await();
    }

    private void leader() throws KeeperException, InterruptedException {
        System.out.println(myNode + "成为leader");
        toBeChildNode();
    }

    private void following() {
        System.out.println(myNode + "成为子节点");
    }

    public static void main(String[] args) throws IOException {
        int threadNum = 4;
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum, new Runnable() {
            public void run() {
                System.out.println("所有线程已启动，开始竞选主节点");
            }
        });
        for (int i = 0; i < threadNum; i++) {
            new Thread(new Runnable() {
                public void run() {
                    try {
                        LeaderChooseDemo demo = new LeaderChooseDemo();
                        cyclicBarrier.await();
                        demo.toBeLeader();
                        demo.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
}
